PPO Implementation for Coders

RL
Code
Python
Author

Arpan Pallar

Published

June 14, 2023

PPO is state of the art algorithm for DeepRL. We ll see how this can be implemented in code(https://github.com/Arpan12/Reinforcement-Learning-Algos/blob/main/PPO_Discrete.ipynb)

We wont go in depth into theory but focus on implementation.

Promise

After reading till the end of the blog, you should be able to implement PPO for discrete action space based environments.

Theory

PPO is a policy gradient method

\(\pi(u|x)\to\)our current policy. This in deep RL is usually approximate by a Neural Net called actor net

\(g(x_n,u_n)\to\) cost incurred, we get from the environment for taking action

\(V_\pi(x) = g(x,\pi(x)) + \alpha*V_\pi(x+1)\) = value function for policy \(\pi\) . This in deep RL is usually approximate by a Neural Net called Critic net

if \(J(\theta) = V_\pi(x_0)\) approximate value function for an episode starting at x0 following policy \(\pi(\theta)\) ,We can decrease \(J(\theta)\) by gradient descent \(\theta_{new} = \theta - \gamma*\Delta J(\theta)\)

so writing more mathematically

\(J(\theta) = E_{u_n}\sim \pi_{\theta}[\sum_{n=0}^N\alpha^n*g(x_n,u_n)]\)

we are able to show(proof not important)

\(\Delta_\theta J(\theta) = E[\sum_{n=0}^N\Psi_n\Delta_nlog\ \pi(u_n|x_n,\theta)]\)

which is similar to minimizing:

\(min_\theta E[\Psi_n*log\ \pi(u_n|x_n,|\theta)]\)

This is what we would use for actor loss and minimize. In Practice since we get rewards from the Env so we put a negative sign before \(\Psi_n\) so that we maximize the reward gain

For critic loss we take \(\Delta t_d^2\) where \(t_d = g(x_n,u_n)+\gamma*V(x_{n+1)}-V(x_n)\)

for

  1. ReInforce: \(\Psi_n = G_n = \sum_{k=n}^N \alpha^k * g(x_k,u_k)\)

  2. ReInforce with baseLine: \(\Psi_n = G_n-V(x_n)\)

  3. Actor-Critic: \(\Psi_n = g(x_n,u_n)+\alpha*V(x_{n+1})-V(x_n)\)

  4. PPO: \(\Psi_n = A_t = \sum_{t=k}^N (\lambda *\mu)^{t-k}*( Q(x_n,u_n)-V(x_n))\)

    where \(Q(x_n,u_n) = g(x_n,u_n)+\gamma*V({x_{n+1}})\)

    but for PPO we approximate the log gradient even further by \(min_\theta E[\sum_{n=0}^N A_n\frac{\pi(u_n|x_n,Theta)}{\pi(u_n| x_n,\theta_{old})}]\)

For PPO we generally clip the gradient by limiting between 1-\(\epsilon\) and 1+\(\epsilon\)

\(min_\theta E[\sum_{n=0}^N min(A_n\ \frac{\pi(u_n|x_n,Theta)}{\pi(u_n| x_n,\theta_{old}},clip(\frac{\pi(u_n|x_n,Theta)}{\pi(u_n| x_n,\theta_{old}},1-\epsilon,1+\epsilon)A_n)]\)

Implementation

Make Actor and Critic NN

actor NN: takes in a state(you need to make a tensor out of your state.Cant pass a list or np array) and spits out a Categorical object containing probs(because the last layer was softmax on action_dim)

for eg: in our case our observation is (4,) python list

>>> print(f"observation= {observation}")
observation= [ 0.02916372  0.02608052  0.01869606 -0.00384168]

we need to convert them to Tensor before passing them to NN

>>> state = T.tensor([observation],dtype=T.float).to(self.actor.device)
>>> print(f"state= {state}")
state= tensor([[ 0.0292,  0.0261,  0.0187, -0.0038]], device='cuda:0')

we get a tensor with Probability dist from NN(last layer was softmax). We then get an action by sampling the dist object

>>> dist = self.actor(state)
>>> print(f"dist {dist} {dist.probs}")
dist Categorical(probs: torch.Size([1, 2])) tensor([[0.4913, 0.5087]], device='cuda:0', grad_fn=<DivBackward0>)

>>> action = dist.sample()
>>> print(f"action = {action}")
action = tensor([1], device='cuda:0')

# Retrieve action from action tensor and log_probs
# squeeze removes all axises having value 1. item() returns a standard python float
>>>  probs = T.squeeze(dist.log_prob(action)).item()
>>>  action = T.squeeze(action).item()
>>> print(f"log prob {dist.log_prob(action)} squeezed = {T.squeeze(dist.log_prob(action))} Probs = {probs}")
log prob tensor([-0.7533], device='cuda:0', grad_fn=<SqueezeBackward1>) squeezed = -0.753328263759613 Probs = -0.753328263759613

The code For Actor class. It derives from Torch.nn.Module

class AgentNetwork(nn.Module):
    def __init__(self,input_dims,action_dim,lr,layer1=256,layer2=256,weight_file='weightFiles/ppo_discrete'):
        super(AgentNetwork,self).__init__()
        self.checkpoint_file = os.path.join(weight_file,'ppo_actor_weight')
        #TOCHECK: *input_dims vs input_dims
        self.actor = nn.Sequential(
                nn.Linear(*input_dims,layer1),
                nn.ReLU(),
                nn.Linear(layer1,layer2),
                nn.ReLU(),
                nn.Linear(layer2,action_dim),
                nn.Softmax(dim=-1)               
        )
        
        self.optimizer = optim.Adam(self.parameters(),lr=lr)
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
        self.to(self.device)
    
    def forward(self,state):
        dist = self.actor(state)
        #TOCHECK: what does categorical do
        dist = Categorical(dist)
        return dist
    
    def save_checkpoint(self):
        T.save(self.state_dict(),self.checkpoint_file)
    
    def load_checkpoint(self):
        self.load_state_dict(T.load(self.checkpoint_file))

Critic NN: takes in a state(tensor) and returns a tensor containing a single float corresponding to Value of that state

>>> state = T.tensor([observation],dtype=T.float).to(self.actor.device)
>>> value = self.critic(state)
>>> print(f"value {value}")
value tensor([[-0.0977]], device='cuda:0', grad_fn=<AddmmBackward0>)

Code for Critic Class

 class CriticNetwork(nn.Module):
    def __init__(self,input_dims,lr,layer1=256,layer2=256,weight_file='weightFiles/ppo_discrete'):
        super(CriticNetwork,self).__init__()
        self.checkpoint_file = os.path.join(weight_file,'ppo_critic_weight')
        self.critic = nn.Sequential(
                nn.Linear(*input_dims,layer1),
                nn.ReLU(),
                nn.Linear(layer1,layer2),
                nn.ReLU(),
                nn.Linear(layer2,1)
        )
        self.optimizer = optim.Adam(self.parameters(),lr=lr)
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
        self.to(self.device)
        
    def forward(self,state):
        value = self.critic(state)
        return value
    
    def save_checkpoint(self):
        T.save(self.state_dict(),self.checkpoint_file)
    
    def load_checkpoint(self):
        self.load_state_dict(T.load(self.checkpoint_file))

Batches and How to generate them

We need to take all the elements in the memory and create batches out of them.

GOAL

1.The elements should not repeat from one batch to another

2.There should be any correlation between elements of same batch.

Here is how we achieve that in code

  def generate_batches(self):
        n_states = len(self.states)
        batches=[]
        i=0
        indices = np.arange(n_states,dtype = np.int64)
        np.random.shuffle(indices)
        for i in range(n_states):
            batches.append(indices[i:i+self.batch_size])
            i+=self.batch_size
        return np.array(self.states),\
                np.array(self.actions),\
                np.array(self.probs), \
                np.array(self.vals),\
                np.array(self.rewards), \
                np.array(self.dones),\
                batches
#Note here only batch index get suffled. We need to have the actual order of states,actions,probs,vals,rewards and done so that we can calculate advatage of each state later

compute the advantage

We know that advantage of a state is given by following equation

\(A_t = \sum_{t=k}^N (\lambda *\mu)^{t-k}*( Q(x_n,u_n)-V(x_n))\)

Here is how we achieve that in code

state_arr,action_arr,probs_arr,vals_arr, \
    rewards_arr,dones_arr,batches = self.memory.generate_batches()
            
advantages=np.zeros_like(rewards_arr)
            
for t in reversed(range(len(state_arr)-1)):
    advantages[t] = rewards_arr[t]+self.gamma*vals_arr[t+1]*(1-int(dones_arr[t]))-vals_arr[t] + self.gamma*self.lambda_factor*advantages[t+1]
                
advantages = T.tensor(advantages).to(self.actor.device)

loss function

for actor loss,we know that we need to minimize

\(min_\theta E[\sum_{n=0}^N min(A_n\ \frac{\pi(u_n|x_n,Theta)}{\pi(u_n| x_n,\theta_{old}},clip(\frac{\pi(u_n|x_n,Theta)}{\pi(u_n| x_n,\theta_{old}},1-\epsilon,1+\epsilon)A_n)]\)

so we can find gradient of this function with respect to our weights \(\Theta\) and do a 1 step gradient descent

This can be done in Code by

 states = T.tensor(state_arr[batch],dtype = T.float).to(self.actor.device)
actions = T.tensor(action_arr[batch],dtype = T.float).to(self.actor.device)
old_probs = T.tensor(probs_arr[batch],dtype = T.float).to(self.actor.device)
dist = self.actor(states)
new_probs = dist.log_prob(actions)
#TOCHECK: what do exp() do
                
prob_ratio = new_probs.exp()/old_probs.exp()
                
weighted_prob = advantages[batch]*(prob_ratio)
                
weighted_clipped_probs = T.clamp(prob_ratio,1-self.policy_clip,1+self.policy_clip)*advantages[batch]
                
actor_loss = - T.min(weighted_clipped_probs,weighted_prob).mean()

for Critic loss,we know that we need to minimize the error of values of our state \((V_{des} - V_{pred})^2\)

where

\(V_{des} = a_t+value_{theta\_old}\)

\(V_{pred} = value \ from \ critic \ NN\)

This is done in code as

critic_values = T.squeeze(critic_values)
                
desired_state_values = advantages[batch]+values[batch]
critic_loss = (desired_state_values-critic_values)**2
critic_loss = critic_loss.mean()

Then We calculate the total loss and do 1 step gradient descent as

 total_loss = actor_loss+critic_loss*0.5
                
self.actor.optimizer.zero_grad()
self.critic.optimizer.zero_grad()
total_loss.backward()
self.actor.optimizer.step()
self.critic.optimizer.step()

Learning Pseudo Code

PPO_agent_learn:
  for _ in num_of_epoch:
    compute advantage of the states in memory
    generate batchs
    Iterate over batches:
      calculate cummulative actor loss for the batch
      calculate cummulative critic loss for the batch
      do gradient update
  clear Memory